/* Implementation of EXPIRE (keys with fixed time to live).
 *
 * ----------------------------------------------------------------------------
 *
 * Copyright (c) 2009-2016, Redis Ltd.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */
/*
 * Copyright (c) Valkey Contributors
 * All rights reserved.
 * SPDX-License-Identifier: BSD-3-Clause
 */

#include "server.h"
#include "cluster.h"
#include "cluster_migrateslots.h"

/*-----------------------------------------------------------------------------
 * Incremental collection of expired keys.
 *
 * When keys are accessed they are expired on-access. However we need a
 * mechanism in order to ensure keys are eventually removed when expired even
 * if no access is performed on them.
 *----------------------------------------------------------------------------*/

/* Constants table from pow(0.98, 1) to pow(0.98, 16).
 * Help calculating the db->avg_ttl. */
static double avg_ttl_factor[16] = {0.98, 0.9604, 0.941192, 0.922368, 0.903921, 0.885842, 0.868126, 0.850763,
                                    0.833748, 0.817073, 0.800731, 0.784717, 0.769022, 0.753642, 0.738569, 0.723798};

/* Helper function for the activeExpireCycle() function.
 * This function will try to expire the key-value entry 'val'.
 *
 * If the key is found to be expired, it is removed from the database and
 * 1 is returned. Otherwise no operation is performed and 0 is returned.
 *
 * When a key is expired, server.stat_expiredkeys is incremented.
 *
 * The parameter 'now' is the current time in milliseconds as is passed
 * to the function to avoid too many gettimeofday() syscalls. */
int activeExpireCycleTryExpire(serverDb *db, robj *val, long long now, int didx) {
    long long t = objectGetExpire(val);
    serverAssert(t >= 0);
    if (now > t) {
        enterExecutionUnit(1, 0);
        sds key = objectGetKey(val);
        robj *keyobj = createStringObject(key, sdslen(key));
        deleteExpiredKeyAndPropagateWithDictIndex(db, keyobj, didx);
        decrRefCount(keyobj);
        exitExecutionUnit();
        return 1;
    } else {
        return 0;
    }
}

/* Try to expire a few timed out keys. The algorithm used is adaptive and
 * will use few CPU cycles if there are few expiring keys, otherwise
 * it will get more aggressive to avoid that too much memory is used by
 * keys that can be removed from the keyspace.
 *
 * Every expire cycle tests multiple databases: the next call will start
 * again from the next db. No more than CRON_DBS_PER_CALL databases are
 * tested at every iteration.
 *
 * The function can perform more or less work, depending on the "type"
 * argument. It can execute a "fast cycle" or a "slow cycle". The slow
 * cycle is the main way we collect expired cycles: this happens with
 * the "server.hz" frequency (usually 10 hertz).
 *
 * However the slow cycle can exit for timeout, since it used too much time.
 * For this reason the function is also invoked to perform a fast cycle
 * at every event loop cycle, in the beforeSleep() function. The fast cycle
 * will try to perform less work, but will do it much more often.
 *
 * The following are the details of the two expire cycles and their stop
 * conditions:
 *
 * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
 * "fast" expire cycle that takes no longer than ACTIVE_EXPIRE_CYCLE_FAST_DURATION
 * microseconds, and is not repeated again before the same amount of time.
 * The cycle will also refuse to run at all if the latest slow cycle did not
 * terminate because of a time limit condition.
 *
 * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
 * executed, where the time limit is a percentage of the REDIS_HZ period
 * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. In the
 * fast cycle, the check of every database is interrupted once the number
 * of already expired keys in the database is estimated to be lower than
 * a given percentage, in order to avoid doing too much work to gain too
 * little memory.
 *
 * The configured expire "effort" will modify the baseline parameters in
 * order to do more work in both the fast and slow expire cycles.
 */

#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20    /* Keys for each DB loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000  /* Microseconds. */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25   /* Max % of CPU to use. */
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which */

/* Data used by the key expire kvstore scan callback. */
typedef struct {
    serverDb *db;
    long long now;
    unsigned long sampled; /* num keys checked */
    unsigned long expired; /* num keys expired */
    long long ttl_sum;     /* sum of ttl for key with ttl not yet expired */
    int ttl_samples;       /* num keys with ttl not yet expired */

    /* Entry-specific fields */
    unsigned long max_entries;     /* Max number of entries (e.g. fields) to expire during this scan */
    bool has_more_expired_entries; /* True if the hash likely has more fields to expire */
} expireScanData;

typedef struct activeExpireFieldIterator {
    int current_db;
    unsigned long cursor; /* Cursor for keys with volatile items (field-level TTL) */
} activeExpireFieldIterator;

void expireScanCallback(void *privdata, void *entry, int didx) {
    robj *val = entry;
    expireScanData *data = privdata;
    long long ttl = objectGetExpire(val) - data->now;
    if (activeExpireCycleTryExpire(data->db, val, data->now, didx)) {
        data->expired++;
        /* Propagate the DEL command */
        postExecutionUnitOperations();
    }
    if (ttl > 0) {
        /* We want the average TTL of keys yet not expired. */
        data->ttl_sum += ttl;
        data->ttl_samples++;
    }
    data->sampled++;
}

/* Expires up to `max_entries` fields from a hash with volatile fields.
 * Sets `has_more_expired_entries` if more remain. Updates stats. */
void fieldExpireScanCallback(void *privdata, void *volaKey, int didx) {
    expireScanData *data = privdata;
    robj *o = volaKey;
    serverAssert(o);
    serverAssert(hashTypeHasVolatileFields(o));
    mstime_t now = server.mstime;
    size_t expired_fields = dbReclaimExpiredFields(o, data->db, now, data->max_entries, didx);
    if (expired_fields) {
        data->has_more_expired_entries = (expired_fields == data->max_entries);
        data->expired++;
    }
    data->sampled++;
}

static int expireShouldSkipTableForSamplingCb(hashtable *ht) {
    long long numkeys = hashtableSize(ht);
    unsigned long buckets = hashtableBuckets(ht);
    /* When there are less than 1% filled buckets, sampling the key
     * space is expensive, so stop here waiting for better times...
     * The dictionary will be resized asap. */
    if (buckets > 0 && (numkeys * 100 / buckets < 1)) {
        return 1;
    }
    return 0;
}

/* Returns the zero-based active expire effort level.
 *
 * Internally we use a 0-based effort level (0–9), while the server config
 * exposes it as 1–10. This helper normalizes it for internal use. */
static int activeExpireEffort(void) {
    return server.active_expire_effort - 1;
}

static long long activeExpireCycleJob(enum activeExpiryType jobType, int cycleType, long long timelimit_us) {
    if (timelimit_us <= 0) return 0;

    unsigned long config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE - activeExpireEffort();
    unsigned long keys_per_loop =
        ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP + ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP / 4 * activeExpireEffort();

    /* This function has some global state in order to continue the work
     * incrementally across calls. */
    typedef struct {
        unsigned int current_db; /* Next DB to test. */
        bool timelimit_exit;     /* Time limit hit in previous call? */
    } expireState;
    static expireState _expire_state[ACTIVE_EXPIRY_TYPE_COUNT] = {0}; // [KEYS, FIELDS]
    expireState *state = &_expire_state[jobType];
    double *expired_stale_perc[ACTIVE_EXPIRY_TYPE_COUNT] = {
        &server.stat_expired_keys_stale_perc,
        &server.stat_expired_keys_with_vola_stale_perc,
    };

    int j, iteration = 0;
    int dbs_per_call = CRON_DBS_PER_CALL;
    int dbs_performed = 0;
    int time_check_mask; /* Check time limit when (i & mask) == 0, i.e. every (X+1)th of the loop. */
    monotime start = getMonotonicUs();

    if (cycleType == ACTIVE_EXPIRE_CYCLE_FAST) {
        /* Don't start a fast cycle if the previous cycle did not exit
         * for time limit, unless the percentage of estimated stale keys is
         * too high. */
        if (!state->timelimit_exit && *expired_stale_perc[jobType] < config_cycle_acceptable_stale) return 0;
    }

    /* We usually should test CRON_DBS_PER_CALL per iteration, with
     * two exceptions:
     *
     * 1) Don't test more DBs than we have.
     * 2) If last time we hit the time limit, we want to scan all DBs
     * in this iteration, as there is work to do in some DB and we don't want
     * expired keys to use memory for too much time. */
    if (dbs_per_call > server.dbnum || state->timelimit_exit) dbs_per_call = server.dbnum;

    state->timelimit_exit = false;

    /* Accumulate some global stats as we expire keys, to have some idea
     * about the number of keys that are already logically expired, but still
     * existing inside the database. */
    long total_sampled = 0;
    long total_expired = 0;

    /* Stop iteration when one of the following conditions is met:
     *
     * 1) We have checked a sufficient number of databases with expiration time.
     * 2) The time limit has been exceeded.
     * 3) All databases have been traversed. */
    for (j = 0; dbs_performed < dbs_per_call && state->timelimit_exit == 0 && j < server.dbnum; j++) {
        /* Scan callback data including expired and checked count per iteration. */
        expireScanData data = {0};
        /* Increment the DB now so we are sure if we run out of time
         * in the current DB we'll restart from the next. This allows to
         * distribute the time evenly across DBs. */
        serverDb *db = server.db[(state->current_db++ % server.dbnum)];
        /* In case the current database is not used we can simply skip to the next database. */
        if (!db) continue;

        data.ttl_sum = 0;
        data.ttl_samples = 0;
        data.max_entries = keys_per_loop * 4;
        data.db = db;

        int db_done = 0; /* The scan of the current DB is done? */
        int update_avg_ttl_times = 0, repeat = 0;

        kvstoreScanFunction scan_cb;

        kvstore *kvs = NULL;
        if (db) {
            switch (jobType) {
            case KEYS:
                kvs = db->expires;
                scan_cb = expireScanCallback;
                time_check_mask = 0xf; /* For regular keys we can check the time condition every 16 loop iterations */
                break;
            case FIELDS:
                kvs = db->keys_with_volatile_items;
                scan_cb = fieldExpireScanCallback;
                /* For field-level keys we check the time condition every loop iteration.
                 * This is required since we might perform much more operation per single key with many fields.
                 * Limiting the number of fields we scan in each field makes the overall process less efficient.
                 * So we just perform more clock checks after each iteration. */
                time_check_mask = 0x0;
                break;
            default:
                serverPanic("Unknown active expiry job type %d.", jobType);
            }
        }

        if (db && kvstoreSize(kvs)) dbs_performed++;

        /* Continue to expire if at the end of the cycle there are still
         * a big percentage of keys to expire, compared to the number of keys
         * we scanned. The percentage, stored in config_cycle_acceptable_stale
         * is not fixed, but depends on the configured "expire effort". */
        do {
            if (db == NULL) {
                break; /* DB not allocated since it was never used */
            }

            unsigned long num;
            iteration++;

            /* If there is nothing to expire try next DB ASAP. */
            if ((num = kvstoreSize(kvs)) == 0) {
                db->expiry[jobType].avg_ttl = 0;
                break;
            }
            data.now = server.mstime;

            /* The main collection cycle. Scan through keys among keys
             * with an expire set, checking for expired ones. */
            data.sampled = 0;
            data.expired = 0;

            if (num > keys_per_loop) num = keys_per_loop;

            /* Here we access the low level representation of the hash table
             * for speed concerns: this makes this code coupled with dict.c,
             * but it hardly changed in ten years.
             *
             * Note that certain places of the hash table may be empty,
             * so we want also a stop condition about the number of
             * buckets that we scanned. However scanning for free buckets
             * is very fast: we are in the cache line scanning a sequential
             * array of NULL pointers, so we can scan a lot more buckets
             * than keys in the same time. */
            long max_buckets = num * 10;
            long checked_buckets = 0;

            int origin_ttl_samples = data.ttl_samples;

            while (data.sampled < num && checked_buckets < max_buckets) {
                unsigned long cursor = db->expiry[jobType].cursor;
                cursor = kvstoreScan(kvs, cursor, -1, scan_cb,
                                     expireShouldSkipTableForSamplingCb, &data);
                if (!data.has_more_expired_entries) db->expiry[jobType].cursor = cursor;
                if (db->expiry[jobType].cursor == 0 && !data.has_more_expired_entries) {
                    db_done = 1;
                    break;
                }
                checked_buckets++;
            }
            total_expired += data.expired;
            total_sampled += data.sampled;

            /* If find keys with ttl not yet expired, we need to update the average TTL stats once. */
            if (data.ttl_samples - origin_ttl_samples > 0) update_avg_ttl_times++;

            /* We don't repeat the cycle for the current database if the db is done
             * for scanning or an acceptable number of stale keys (logically expired
             * but yet not reclaimed). */
            repeat = db_done
                         ? 0
                         : (data.sampled == 0 || (data.expired * 100 / data.sampled) > config_cycle_acceptable_stale);

            /* We can't block forever here even if there are many keys to
             * expire. So after a given amount of microseconds return to the
             * caller waiting for the other active expire cycle. */
            if ((iteration & 0xf) == 0 ||
                !repeat) { /* Update the average TTL stats every 16 iterations or about to exit. */
                /* Update the average TTL stats for this database,
                 * because this may reach the time limit. */
                if (data.ttl_samples && jobType == KEYS) {
                    /* Average TTL is calculated only for keys, as there's currently
                     * no reliable way to compute it for fields. */

                    long long avg_ttl = data.ttl_sum / data.ttl_samples;

                    /* Do a simple running average with a few samples.
                     * We just use the current estimate with a weight of 2%
                     * and the previous estimate with a weight of 98%. */
                    if (db->expiry[jobType].avg_ttl == 0) {
                        db->expiry[jobType].avg_ttl = avg_ttl;
                    } else {
                        /* The origin code is as follow.
                         * for (int i = 0; i < update_avg_ttl_times; i++) {
                         *   db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
                         * }
                         * We can convert the loop into a sum of a geometric progression.
                         * db->avg_ttl = db->avg_ttl * pow(0.98, update_avg_ttl_times) +
                         *                  avg_ttl / 50 * (pow(0.98, update_avg_ttl_times - 1) + ... + 1)
                         *             = db->avg_ttl * pow(0.98, update_avg_ttl_times) +
                         *                  avg_ttl * (1 - pow(0.98, update_avg_ttl_times))
                         *             = avg_ttl +  (db->avg_ttl - avg_ttl) * pow(0.98, update_avg_ttl_times)
                         * Notice that update_avg_ttl_times is between 1 and 16, we use a constant table
                         * to accelerate the calculation of pow(0.98, update_avg_ttl_times).*/
                        db->expiry[jobType].avg_ttl = avg_ttl + (db->expiry[jobType].avg_ttl - avg_ttl) * avg_ttl_factor[update_avg_ttl_times - 1];
                    }
                    update_avg_ttl_times = 0;
                    data.ttl_sum = 0;
                    data.ttl_samples = 0;
                }
            }
            /* check time limit for every FIELDS job iteration or every 16 iterations for KEYS. */
            if ((iteration & time_check_mask) == 0) {
                if (elapsedUs(start) > (uint64_t)timelimit_us) {
                    state->timelimit_exit = 1;
                    server.stat_expired_time_cap_reached_count++;
                    break;
                }
            }
        } while (repeat);
    }

    long long elapsed = (long long)elapsedUs(start);
    if (jobType == KEYS) {
        latencyTraceIfNeeded(db, expire_cycle_keys, elapsed);
    } else if (jobType == FIELDS) {
        latencyTraceIfNeeded(db, expire_cycle_fields, elapsed);
    }

    /* Update our estimate of keys existing but yet to be expired.
     * Running average with this sample accounting for 5%. */
    double current_perc;
    if (total_sampled) {
        current_perc = (double)total_expired / total_sampled;
    } else
        current_perc = 0;
    *expired_stale_perc[jobType] = (current_perc * 0.05) + (*expired_stale_perc[jobType] * 0.95);

    return elapsed;
}

/* activeExpireCycle
 *
 * This function performs active expiration of both normal keys (with TTL)
 * and hash fields (with field-level TTL via volatile sets). Its purpose is to
 * reclaim memory from logically expired entries.
 *
 * The expiry is performed incrementally over multiple databases, respecting
 * a CPU time budget derived from the configured active-expire-effort.
 *
 * There are two separate expiry mechanisms for keys and for hash fields
 * because their iteration models are fundamentally different:
 * - key expiry operates on db->key entries, scanning random keys
 *   with attached TTL entries.
 * - field expiry operates on db->key->volatile_set entries, scanning
 *   fields within a hash that each have their own TTL.
 * This hierarchy and lookup pattern are entirely different, requiring
 * separate cursors, iteration logic, and data structure handling.
 *
 * The function uses an alternating scheme across event loop cycles: on one
 * cycle it will prioritize key expiry first, then hash field expiry if time
 * permits; on the next cycle, it will prioritize hash field expiry first,
 * then key expiry if time permits. This ensures fairness and prevents
 * starvation of either mechanism. Since the memory reclaim pace and iteration
 * model of keys versus hash fields are different and unpredictable,
 * alternating naturally balances the overall expiry effort when both are
 * fully consuming their available time budget. */
void activeExpireCycle(int type) {
    /* If 'expire' action is paused, for whatever reason, then don't expire any key.
     * Typically, at the end of the pause we will properly expire the key OR we
     * will have failed over and the new primary will send us the expire. */
    if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return;

    /* Adjust the running parameters according to the configured expire
     * effort. The default effort is 1, and the maximum configurable effort
     * is 10. Also make sure not to run fast cycles back to back. */
    long long timelimit_us;
    if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
        long long config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION + ACTIVE_EXPIRE_CYCLE_FAST_DURATION / 4 * activeExpireEffort();

        /* Never repeat a fast cycle for the same period
         * as the fast cycle total duration itself. */
        static monotime last_fast_cycle_start_time; /* When last fast cycle ran. */
        monotime start = getMonotonicUs();
        if (start < last_fast_cycle_start_time + config_cycle_fast_duration * 2) return;

        last_fast_cycle_start_time = start;
        timelimit_us = config_cycle_fast_duration;
    } else {
        /* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
         * time per iteration. Since this function gets called with a frequency of
         * server.hz times per second, the following is the max amount of
         * microseconds we can spend in this function. */
        int config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC + 2 * activeExpireEffort();
        timelimit_us = config_cycle_slow_time_perc * 1000000 / server.hz / 100;
    }

    static bool expireCycleStartWithFields = 0;
    long long elapsed = 0;

    /* Try to smoke-out bugs (server.also_propagate should be empty here) */
    serverAssert(server.also_propagate.numops == 0);

    if (expireCycleStartWithFields) {
        elapsed += activeExpireCycleJob(FIELDS, type, timelimit_us - elapsed);
        elapsed += activeExpireCycleJob(KEYS, type, timelimit_us - elapsed);
    } else {
        elapsed += activeExpireCycleJob(KEYS, type, timelimit_us - elapsed);
        elapsed += activeExpireCycleJob(FIELDS, type, timelimit_us - elapsed);
    }
    server.stat_expire_cycle_time_used += elapsed;
    latencyAddSampleIfNeeded("expire-cycle", elapsed);
    latencyTraceIfNeeded(db, expire_cycle, elapsed);
    expireCycleStartWithFields = !expireCycleStartWithFields;
}

/*-----------------------------------------------------------------------------
 * Expires of keys created in writable replicas
 *
 * Normally replicas do not process expires: they wait the primaries to synthesize
 * DEL operations in order to retain consistency. However writable replicas are
 * an exception: if a key is created in the replica and an expire is assigned
 * to it, we need a way to expire such a key, since the primary does not know
 * anything about such a key.
 *
 * In order to do so, we track keys created in the replica side with an expire
 * set, and call the expirereplicaKeys() function from time to time in order to
 * reclaim the keys if they already expired.
 *
 * Note that the use case we are trying to cover here, is a popular one where
 * replicas are put in writable mode in order to compute slow operations in
 * the replica side that are mostly useful to actually read data in a more
 * processed way. Think at sets intersections in a tmp key, with an expire so
 * that it is also used as a cache to avoid intersecting every time.
 *
 * This implementation is currently not perfect but a lot better than leaking
 * the keys as implemented in 3.2.
 *----------------------------------------------------------------------------*/

/* The dictionary where we remember key names and database ID of keys we may
 * want to expire from the replica. Since this function is not often used we
 * don't even care to initialize the database at startup. We'll do it once
 * the feature is used the first time, that is, when rememberreplicaKeyWithExpire()
 * is called.
 *
 * The dictionary has an SDS string representing the key as the hash table
 * key, while the value is a 64 bit unsigned integer with the bits corresponding
 * to the DB where the keys may exist set to 1. Currently the keys created
 * with a DB id > 63 are not expired, but a trivial fix is to set the bitmap
 * to the max 64 bit unsigned value when we know there is a key with a DB
 * ID greater than 63, and check all the configured DBs in such a case. */
dict *replicaKeysWithExpire = NULL;

/* Check the set of keys created by the primary with an expire set in order to
 * check if they should be evicted. */
void expireReplicaKeys(void) {
    if (replicaKeysWithExpire == NULL || dictSize(replicaKeysWithExpire) == 0) return;

    int cycles = 0, noexpire = 0;
    mstime_t start = mstime();
    while (1) {
        dictEntry *de = dictGetRandomKey(replicaKeysWithExpire);
        sds keyname = dictGetKey(de);
        uint64_t dbids = dictGetUnsignedIntegerVal(de);
        uint64_t new_dbids = 0;

        /* Check the key against every database corresponding to the
         * bits set in the value bitmap. */
        int dbid = 0;
        while (dbids && dbid < server.dbnum) {
            if ((dbids & 1) != 0) {
                serverDb *db = server.db[dbid];
                int didx = getKVStoreIndexForKey(keyname);
                robj *expire = db == NULL ? NULL : dbFindExpiresWithDictIndex(db, keyname, didx);
                int expired = 0;

                if (expire && activeExpireCycleTryExpire(db, expire, start, didx)) {
                    expired = 1;
                    /* Propagate the DEL (writable replicas do not propagate anything to other replicas,
                     * but they might propagate to AOF) and trigger module hooks. */
                    postExecutionUnitOperations();
                }

                /* If the key was not expired in this DB, we need to set the
                 * corresponding bit in the new bitmap we set as value.
                 * At the end of the loop if the bitmap is zero, it means we
                 * no longer need to keep track of this key. */
                if (expire && !expired) {
                    noexpire++;
                    new_dbids |= (uint64_t)1 << dbid;
                }
            }
            dbid++;
            dbids >>= 1;
        }

        /* Set the new bitmap as value of the key, in the dictionary
         * of keys with an expire set directly in the writable replica. Otherwise
         * if the bitmap is zero, we no longer need to keep track of it. */
        if (new_dbids)
            dictSetUnsignedIntegerVal(de, new_dbids);
        else
            dictDelete(replicaKeysWithExpire, keyname);

        /* Stop conditions: found 3 keys we can't expire in a row or
         * time limit was reached. */
        cycles++;
        if (noexpire > 3) break;
        if ((cycles % 64) == 0 && mstime() - start > 1) break;
        if (dictSize(replicaKeysWithExpire) == 0) break;
    }
}

/* Track keys that received an EXPIRE or similar command in the context
 * of a writable replica. */
void rememberReplicaKeyWithExpire(serverDb *db, robj *key) {
    if (replicaKeysWithExpire == NULL) {
        static dictType dt = {
            dictSdsHash,       /* hash function */
            NULL,              /* key dup */
            dictSdsKeyCompare, /* key compare */
            dictSdsDestructor, /* key destructor */
            NULL,              /* val destructor */
            NULL               /* allow to expand */
        };
        replicaKeysWithExpire = dictCreate(&dt);
    }
    if (db->id > 63) return;

    dictEntry *de = dictAddOrFind(replicaKeysWithExpire, key->ptr);
    /* If the entry was just created, set it to a copy of the SDS string
     * representing the key: we don't want to need to take those keys
     * in sync with the main DB. The keys will be removed by expireReplicaKeys()
     * as it scans to find keys to remove. */
    if (dictGetKey(de) == key->ptr) {
        dictSetKey(replicaKeysWithExpire, de, sdsdup(key->ptr));
        dictSetUnsignedIntegerVal(de, 0);
    }

    uint64_t dbids = dictGetUnsignedIntegerVal(de);
    dbids |= (uint64_t)1 << db->id;
    dictSetUnsignedIntegerVal(de, dbids);
}

/* Return the number of keys we are tracking. */
size_t getReplicaKeyWithExpireCount(void) {
    if (replicaKeysWithExpire == NULL) return 0;
    return dictSize(replicaKeysWithExpire);
}

/* Remove the keys in the hash table. We need to do that when data is
 * flushed from the server. We may receive new keys from the primary with
 * the same name/db and it is no longer a good idea to expire them.
 *
 * Note: technically we should handle the case of a single DB being flushed
 * but it is not worth it since anyway race conditions using the same set
 * of key names in a writable replica and in its primary will lead to
 * inconsistencies. This is just a best-effort thing we do. */
void flushReplicaKeysWithExpireList(int async) {
    if (replicaKeysWithExpire) {
        if (async) {
            freeReplicaKeysWithExpireAsync(replicaKeysWithExpire);
        } else {
            dictRelease(replicaKeysWithExpire);
        }
        replicaKeysWithExpire = NULL;
    }
}

int checkAlreadyExpired(long long when) {
    /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
     * should never be executed as a DEL when load the AOF or in the context
     * of a replica instance.
     *
     * Instead we add the already expired key to the database with expire time
     * (possibly in the past) and wait for an explicit DEL from the primary.
     *
     * If the server is a primary and in the import mode, we also add the already
     * expired key and wait for an explicit DEL from the import source.
     *
     * If the server is receiving the key from a slot migration, we will accept
     * expired keys and wait for the source to propagate deletion. */
    if (server.current_client && server.current_client->slot_migration_job) return 0;
    return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.import_mode);
}

/* Parse additional flags of expire commands up to the specify max_index.
 * In case max_index will scan all arguments.
 *
 * Supported flags:
 * - NX: set expiry only when the key has no expiry
 * - XX: set expiry only when the key has an existing expiry
 * - GT: set expiry only when the new expiry is greater than current one
 * - LT: set expiry only when the new expiry is less than current one */
int parseExtendedExpireArgumentsOrReply(client *c, int *flags, int max_args) {
    int nx = 0, xx = 0, gt = 0, lt = 0;

    int j = 3;
    while (j < max_args) {
        char *opt = c->argv[j]->ptr;
        if (!strcasecmp(opt, "nx")) {
            *flags |= EXPIRE_NX;
            nx = 1;
        } else if (!strcasecmp(opt, "xx")) {
            *flags |= EXPIRE_XX;
            xx = 1;
        } else if (!strcasecmp(opt, "gt")) {
            *flags |= EXPIRE_GT;
            gt = 1;
        } else if (!strcasecmp(opt, "lt")) {
            *flags |= EXPIRE_LT;
            lt = 1;
        } else {
            addReplyErrorFormat(c, "Unsupported option %s", opt);
            return C_ERR;
        }
        j++;
    }

    if ((nx && xx) || (nx && gt) || (nx && lt)) {
        addReplyError(c, "NX and XX, GT or LT options at the same time are not compatible");
        return C_ERR;
    }

    if (gt && lt) {
        addReplyError(c, "GT and LT options at the same time are not compatible");
        return C_ERR;
    }

    return C_OK;
}

int convertExpireArgumentToUnixTime(client *c, robj *arg, long long basetime, int unit, long long *unixtime) {
    long long when;
    if (getLongLongFromObjectOrReply(c, arg, &when, NULL) != C_OK) return C_ERR;

    if (when < 0) {
        addReplyErrorExpireTime(c);
        return C_ERR;
    }

    if (unit == UNIT_SECONDS) {
        if (when > LLONG_MAX / 1000 || when < LLONG_MIN / 1000) {
            addReplyErrorExpireTime(c);
            return C_ERR;
        }
        when *= 1000;
    }
    if (when > LLONG_MAX - basetime) {
        addReplyErrorExpireTime(c);
        return C_ERR;
    }
    when += basetime;
    debugServerAssert(unixtime);
    *unixtime = when;
    return C_OK;
}

/*-----------------------------------------------------------------------------
 * Expires Commands
 *----------------------------------------------------------------------------*/

/* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT
 * and PEXPIREAT. Because the command second argument may be relative or absolute
 * the "basetime" argument is used to signal what the base time is (either 0
 * for *AT variants of the command, or the current time for relative expires).
 *
 * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
 * the argv[2] parameter. The basetime is always specified in milliseconds.
 *
 * Additional flags are supported and parsed via parseExtendedExpireArguments */
void expireGenericCommand(client *c, long long basetime, int unit) {
    robj *key = c->argv[1], *param = c->argv[2];
    long long when; /* unix time in milliseconds when the key will expire. */
    long long current_expire = -1;
    int flag = 0;

    /* checking optional flags */
    if (parseExtendedExpireArgumentsOrReply(c, &flag, c->argc) != C_OK) {
        return;
    }

    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK) return;

    /* EXPIRE allows negative numbers, but we can at least detect an
     * overflow by either unit conversion or basetime addition. */
    if (unit == UNIT_SECONDS) {
        if (when > LLONG_MAX / 1000 || when < LLONG_MIN / 1000) {
            addReplyErrorExpireTime(c);
            return;
        }
        when *= 1000;
    }

    if (when > LLONG_MAX - basetime) {
        addReplyErrorExpireTime(c);
        return;
    }
    when += basetime;

    robj *obj = lookupKeyWrite(c->db, key);

    /* No key, return zero. */
    if (obj == NULL) {
        addReply(c, shared.czero);
        return;
    }

    if (flag) {
        current_expire = objectGetExpire(obj);

        /* NX option is set, check current expiry */
        if (flag & EXPIRE_NX) {
            if (current_expire != -1) {
                addReply(c, shared.czero);
                return;
            }
        }

        /* XX option is set, check current expiry */
        if (flag & EXPIRE_XX) {
            if (current_expire == -1) {
                /* reply 0 when the key has no expiry */
                addReply(c, shared.czero);
                return;
            }
        }

        /* GT option is set, check current expiry */
        if (flag & EXPIRE_GT) {
            /* When current_expire is -1, we consider it as infinite TTL,
             * so expire command with gt always fail the GT. */
            if (when <= current_expire || current_expire == -1) {
                /* reply 0 when the new expiry is not greater than current */
                addReply(c, shared.czero);
                return;
            }
        }

        /* LT option is set, check current expiry */
        if (flag & EXPIRE_LT) {
            /* When current_expire -1, we consider it as infinite TTL,
             * but 'when' can still be negative at this point, so if there is
             * an expiry on the key and it's not less than current, we fail the LT. */
            if (current_expire != -1 && when >= current_expire) {
                /* reply 0 when the new expiry is not less than current */
                addReply(c, shared.czero);
                return;
            }
        }
    }

    if (checkAlreadyExpired(when)) {
        deleteExpiredKeyFromOverwriteAndPropagate(c, key);
        addReply(c, shared.cone);
        return;
    } else {
        obj = setExpire(c, c->db, key, when);
        signalModifiedKey(c, c->db, key);
        notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
        server.dirty++;
        addReply(c, shared.cone);
        /* Propagate as PEXPIREAT millisecond-timestamp
         * Only rewrite the command arg if not already PEXPIREAT */
        if (c->cmd->proc != pexpireatCommand) {
            rewriteClientCommandArgument(c, 0, shared.pexpireat);
        }

        /* Avoid creating a string object when it's the same as argv[2] parameter  */
        if (basetime != 0 || unit == UNIT_SECONDS) {
            robj *when_obj = createStringObjectFromLongLong(when);
            rewriteClientCommandArgument(c, 2, when_obj);
            decrRefCount(when_obj);
        }
        return;
    }
}

/* EXPIRE key seconds [ NX | XX | GT | LT] */
void expireCommand(client *c) {
    expireGenericCommand(c, commandTimeSnapshot(), UNIT_SECONDS);
}

/* EXPIREAT key unix-time-seconds [ NX | XX | GT | LT] */
void expireatCommand(client *c) {
    expireGenericCommand(c, 0, UNIT_SECONDS);
}

/* PEXPIRE key milliseconds [ NX | XX | GT | LT] */
void pexpireCommand(client *c) {
    expireGenericCommand(c, commandTimeSnapshot(), UNIT_MILLISECONDS);
}

/* PEXPIREAT key unix-time-milliseconds [ NX | XX | GT | LT] */
void pexpireatCommand(client *c) {
    expireGenericCommand(c, 0, UNIT_MILLISECONDS);
}

/* Implements TTL, PTTL, EXPIRETIME and PEXPIRETIME */
void ttlGenericCommand(client *c, int output_ms, int output_abs) {
    robj *o;
    long long expire, ttl = -1;

    /* If the key does not exist at all, return -2 */
    if ((o = lookupKeyReadWithFlags(c->db, c->argv[1], LOOKUP_NOTOUCH)) == NULL) {
        addReplyLongLong(c, -2);
        return;
    }

    /* The key exists. Return -1 if it has no expire, or the actual
     * TTL value otherwise. */
    expire = objectGetExpire(o);
    if (expire != -1) {
        ttl = output_abs ? expire : expire - commandTimeSnapshot();
        if (ttl < 0) ttl = 0;
    }
    if (ttl == -1) {
        addReplyLongLong(c, -1);
    } else {
        addReplyLongLong(c, output_ms ? ttl : ((ttl + 500) / 1000));
    }
}

/* TTL key */
void ttlCommand(client *c) {
    ttlGenericCommand(c, 0, 0);
}

/* PTTL key */
void pttlCommand(client *c) {
    ttlGenericCommand(c, 1, 0);
}

/* EXPIRETIME key */
void expiretimeCommand(client *c) {
    ttlGenericCommand(c, 0, 1);
}

/* PEXPIRETIME key */
void pexpiretimeCommand(client *c) {
    ttlGenericCommand(c, 1, 1);
}

/* PERSIST key */
void persistCommand(client *c) {
    if (lookupKeyWrite(c->db, c->argv[1])) {
        if (removeExpire(c->db, c->argv[1])) {
            signalModifiedKey(c, c->db, c->argv[1]);
            notifyKeyspaceEvent(NOTIFY_GENERIC, "persist", c->argv[1], c->db->id);
            addReply(c, shared.cone);
            server.dirty++;
        } else {
            addReply(c, shared.czero);
        }
    } else {
        addReply(c, shared.czero);
    }
}

/* TOUCH key1 [key2 key3 ... keyN] */
void touchCommand(client *c) {
    int touched = 0;
    for (int j = 1; j < c->argc; j++)
        if (lookupKeyRead(c->db, c->argv[j]) != NULL) touched++;
    addReplyLongLong(c, touched);
}

/* Returns true if the provided timestamp represents an expired time, false otherwise.
 * A negative value means no expiration. */
bool timestampIsExpired(mstime_t when) {
    if (when < 0) return false; /* no expire */
    mstime_t now = commandTimeSnapshot();

    /* The time indicated by 'when' is considered expired if the current (virtual or real) time is greater
     * than it. */
    return now > when;
}

/* This function verifies if the current conditions allow expiration of keys and fields.
 * For some cases expiration is not allowed, but we would still like to ignore the key
 * so to treat it as "expired" without actively deleting it. */
expirationPolicy getExpirationPolicyWithFlags(int flags) {
    if (server.loading) return POLICY_IGNORE_EXPIRE;

    /* If we are running in the context of a replica, instead of
     * evicting the expired key from the database, we return ASAP:
     * the replica key expiration is controlled by the primary that will
     * send us synthesized DEL operations for expired keys. The
     * exception is when write operations are performed on writable
     * replicas.
     *
     * Still we try to reflect the correct state to the caller,
     * that is, POLICY_KEEP_EXPIRED so that the key will be ignored, but not deleted.
     *
     * When replicating commands from the primary, keys are never considered
     * expired, so we return POLICY_IGNORE_EXPIRE */
    if (server.primary_host != NULL) {
        if (server.current_client && (server.current_client->flag.primary)) return POLICY_IGNORE_EXPIRE;
        if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return POLICY_KEEP_EXPIRED;
    } else if (server.current_client && server.current_client->slot_migration_job) {
        /* Slot migration client should be treated like a primary */
        return POLICY_IGNORE_EXPIRE;
    } else if (server.import_mode) {
        /* If we are running in the import mode on a primary, instead of
         * evicting the expired key from the database, we return ASAP:
         * the key expiration is controlled by the import source that will
         * send us synthesized DEL operations for expired keys. The
         * exception is when write operations are performed on this server
         * because it's a primary.
         *
         * Notice: other clients, apart from the import source, should not access
         * the data imported by import source.
         *
         * Still we try to reflect the correct state to the caller,
         * that is, POLICY_KEEP_EXPIRED so that the key will be ignored, but not deleted.
         *
         * When receiving commands from the import source, keys are never considered
         * expired, so we return POLICY_IGNORE_EXPIRE */
        if (server.current_client && (server.current_client->flag.import_source)) return POLICY_IGNORE_EXPIRE;
        if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return POLICY_KEEP_EXPIRED;
    }

    /* In some cases we're explicitly instructed to return an indication of a
     * missing key without actually deleting it, even on primaries. */
    if (flags & EXPIRE_AVOID_DELETE_EXPIRED) return POLICY_KEEP_EXPIRED;

    /* If 'expire' action is paused, for whatever reason, then don't expire any key.
     * Typically, at the end of the pause we will properly expire the key OR we
     * will have failed over and the new primary will send us the expire. */
    if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return POLICY_KEEP_EXPIRED;

    return POLICY_DELETE_EXPIRED;
}
